package com.hy.video.monitor.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hy.video.monitor.conf.DynamicTask;
import com.hy.video.monitor.conf.UserSetting;
import com.hy.video.monitor.gb28181.bean.SendRtpItem;
import com.hy.video.monitor.media.zlm.ZLMHttpHookSubscribe;
import com.hy.video.monitor.media.zlm.ZLMMediaListManager;
import com.hy.video.monitor.media.zlm.ZLMRTPServerFactory;
import com.hy.video.monitor.media.zlm.dto.MediaServerItem;
import com.hy.video.monitor.service.IMediaServerService;
import com.hy.video.monitor.service.bean.*;
import com.hy.video.monitor.storager.IRedisCatchStorage;
import com.hy.video.monitor.utils.redis.RedisUtil;
import com.hy.video.monitor.vmanager.bean.VMCResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;


/**
 * 监听下级发送推送信息，并发送国标推流消息上级
 * @author KangJonney
 */
@Component
public class RedisGbPlayMsgListener implements MessageListener {

    private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class);

    public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM";

    /**
     * 流媒体不存在的错误玛
     */
    public static final  int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1;

    /**
     * 离线的错误玛
     */
    public static final  int ERROR_CODE_OFFLINE = -2;

    /**
     * 超时的错误玛
     */
    public static final  int ERROR_CODE_TIMEOUT = -3;

    private Map<String, PlayMsgCallback> callbacks = new ConcurrentHashMap<>();
    private Map<String, PlayMsgCallbackForStartSendRtpStream> callbacksForStartSendRtpStream = new ConcurrentHashMap<>();
    private Map<String, PlayMsgErrorCallback> callbacksForError = new ConcurrentHashMap<>();

    @Autowired
    private UserSetting userSetting;

    @Autowired
    private RedisUtil redis;

    @Autowired
    private ZLMMediaListManager zlmMediaListManager;

    @Autowired
    private ZLMRTPServerFactory zlmrtpServerFactory;

    @Autowired
    private IMediaServerService mediaServerService;

    @Autowired
    private IRedisCatchStorage redisCatchStorage;

    @Autowired
    private DynamicTask dynamicTask;

    @Autowired
    private ZLMMediaListManager mediaListManager;

    @Autowired
    private ZLMHttpHookSubscribe subscribe;


    public interface PlayMsgCallback{
        void handler(ResponseSendItemMsg responseSendItemMsg);
    }

    public interface PlayMsgCallbackForStartSendRtpStream{
        void handler(JSONObject jsonObject);
    }

    public interface PlayMsgErrorCallback{
        void handler(VMCResult VMCResult);
    }

    @Override
    public void onMessage(Message message, byte[] bytes) {
        JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class);
        VmcRedisMsg vmcRedisMsg = JSON.toJavaObject(msgJSON, VmcRedisMsg.class);
        if (!userSetting.getServerId().equals(vmcRedisMsg.getToId())) {
            return;
        }
        if (VmcRedisMsg.isRequest(vmcRedisMsg)) {
            logger.info("[收到REDIS通知] 请求： {}", new String(message.getBody()));

            switch (vmcRedisMsg.getCmd()){
                case VmcRedisMsgCmd.GET_SEND_ITEM:
                    RequestSendItemMsg content = JSON.toJavaObject((JSONObject) vmcRedisMsg.getContent(), RequestSendItemMsg.class);
                    requestSendItemMsgHand(content, vmcRedisMsg.getFromId(), vmcRedisMsg.getSerial());
                    break;
                case VmcRedisMsgCmd.REQUEST_PUSH_STREAM:
                    RequestPushStreamMsg param = JSON.toJavaObject((JSONObject) vmcRedisMsg.getContent(), RequestPushStreamMsg.class);;
                    requestPushStreamMsgHand(param, vmcRedisMsg.getFromId(), vmcRedisMsg.getSerial());
                    break;
                default:
                    break;
            }

        }else {
            logger.info("[收到REDIS通知] 回复： {}", new String(message.getBody()));
            switch (vmcRedisMsg.getCmd()){
                case VmcRedisMsgCmd.GET_SEND_ITEM:

                    VMCResult content  = JSON.toJavaObject((JSONObject) vmcRedisMsg.getContent(), VMCResult.class);

                    String key = vmcRedisMsg.getSerial();
                    switch (content.getCode()) {
                        case 0:
                            ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class);
                            PlayMsgCallback playMsgCallback = callbacks.get(key);
                            if (playMsgCallback != null) {
                                callbacksForError.remove(key);
                                playMsgCallback.handler(responseSendItemMsg);
                            }
                            break;
                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
                        case ERROR_CODE_OFFLINE:
                        case ERROR_CODE_TIMEOUT:
                            PlayMsgErrorCallback errorCallback = callbacksForError.get(key);
                            if (errorCallback != null) {
                                callbacks.remove(key);
                                errorCallback.handler(content);
                            }
                            break;
                        default:
                            break;
                    }
                    break;
                case VmcRedisMsgCmd.REQUEST_PUSH_STREAM:
                    VMCResult VMCResult  = JSON.toJavaObject((JSONObject) vmcRedisMsg.getContent(), VMCResult.class);
                    String serial = vmcRedisMsg.getSerial();
                    switch (VMCResult.getCode()) {
                        case 0:
                            JSONObject jsonObject = (JSONObject)VMCResult.getData();
                            PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial);
                            if (playMsgCallback != null) {
                                callbacksForError.remove(serial);
                                playMsgCallback.handler(jsonObject);
                            }
                            break;
                        case ERROR_CODE_MEDIA_SERVER_NOT_FOUND:
                        case ERROR_CODE_OFFLINE:
                        case ERROR_CODE_TIMEOUT:
                            PlayMsgErrorCallback errorCallback = callbacksForError.get(serial);
                            if (errorCallback != null) {
                                callbacks.remove(serial);
                                errorCallback.handler(VMCResult);
                            }
                            break;
                        default:
                            break;
                    }
                    break;
                default:
                    break;
            }
        }




    }

    /**
     * 处理收到的请求推流的请求
     */
    private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) {
        MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId());
        if (mediaInfo == null) {
            // TODO 回复错误
            return;
        }
        String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1";
        Map<String, Object> param = new HashMap<>();
        param.put("vhost","__defaultVhost__");
        param.put("app",requestPushStreamMsg.getApp());
        param.put("stream",requestPushStreamMsg.getStream());
        param.put("ssrc", requestPushStreamMsg.getSsrc());
        param.put("dst_url",requestPushStreamMsg.getIp());
        param.put("dst_port", requestPushStreamMsg.getPort());
        param.put("is_udp", is_Udp);
        param.put("src_port", requestPushStreamMsg.getSrcPort());
        param.put("pt", requestPushStreamMsg.getPt());
        param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0");
        param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0");
        JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
        // 回复消息
        responsePushStream(jsonObject, fromId, serial);
    }

    private void responsePushStream(JSONObject content, String toId, String serial) {

        VMCResult<JSONObject> result = new VMCResult<>();
        result.setCode(0);
        result.setData(content);

        VmcRedisMsg response = VmcRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
                VmcRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result);
        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
        redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }

    /**
     * 处理收到的请求sendItem的请求
     */
    private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) {
        MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId());
        if (mediaServerItem == null) {
            logger.info("[回复推流信息] 流媒体{}不存在 ", content.getMediaServerId());

            VMCResult<SendRtpItem> result = new VMCResult<>();
            result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND);
            result.setMsg("流媒体不存在");

            VmcRedisMsg response = VmcRedisMsg.getResponseInstance(userSetting.getServerId(), toId,
                    VmcRedisMsgCmd.GET_SEND_ITEM, serial, result);

            JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
            redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
            return;
        }
        // 确定流是否在线
        boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream());
        if (streamReady) {
            logger.info("[回复推流信息]  {}/{}", content.getApp(), content.getStream());
            responseSendItem(mediaServerItem, content, toId, serial);
        }else {
            // 流已经离线
            // 发送redis消息以使设备上线
            logger.info("[ app={}, stream={} ]通道离线，发送redis信息控制设备开始推流",content.getApp(), content.getStream());

            String taskKey = UUID.randomUUID().toString();
            // 设置超时
            dynamicTask.startDelay(taskKey, ()->{
                logger.info("[ app={}, stream={} ] 等待设备开始推流超时", content.getApp(), content.getStream());
                VMCResult<SendRtpItem> result = new VMCResult<>();
                result.setCode(ERROR_CODE_TIMEOUT);
                VmcRedisMsg response = VmcRedisMsg.getResponseInstance(
                        userSetting.getServerId(), toId, VmcRedisMsgCmd.GET_SEND_ITEM, serial, result
                );
                JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
                redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
            }, userSetting.getPlatformPlayTimeout());

            // 添加订阅
            JSONObject subscribeKey = new JSONObject();
            subscribeKey.put("app", content.getApp());
            subscribeKey.put("stream", content.getStream());
            subscribeKey.put("regist", true);
            subscribeKey.put("schema", "rtmp");
            subscribeKey.put("mediaServerId", mediaServerItem.getId());
            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                        dynamicTask.stop(taskKey);
                        responseSendItem(mediaServerItem, content, toId, serial);
                    });

            MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(),
                    content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(),
                    content.getMediaServerId());
            redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);

        }
    }

    /**
     * 将获取到的sendItem发送出去
     */
    private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) {
        SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(),
                content.getPort(), content.getSsrc(), content.getPlatformId(),
                content.getApp(), content.getStream(), content.getChannelId(),
                content.getTcp());

        VMCResult<ResponseSendItemMsg> result = new VMCResult<>();
        result.setCode(0);
        ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg();
        responseSendItemMsg.setSendRtpItem(sendRtpItem);
        responseSendItemMsg.setMediaServerItem(mediaServerItem);
        result.setData(responseSendItemMsg);

        VmcRedisMsg response = VmcRedisMsg.getResponseInstance(
                userSetting.getServerId(), toId, VmcRedisMsgCmd.GET_SEND_ITEM, serial, result
        );
        JSONObject jsonObject = (JSONObject)JSON.toJSON(response);
        redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }

    /**
     * 发送消息要求下级生成推流信息
     * @param serverId 下级服务ID
     * @param app 应用名
     * @param stream 流ID
     * @param ip 目标IP
     * @param port 目标端口
     * @param ssrc  ssrc
     * @param platformId 平台国标编号
     * @param channelId 通道ID
     * @param isTcp 是否使用TCP
     * @param callback 得到信息的回调
     */
    public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc,
                        String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) {
        RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance(
                serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName);
        requestSendItemMsg.setServerId(serverId);
        String key = UUID.randomUUID().toString();
        VmcRedisMsg redisMsg = VmcRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, VmcRedisMsgCmd.GET_SEND_ITEM,
                key, requestSendItemMsg);

        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
        logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject);
        callbacks.put(key, callback);
        callbacksForError.put(key, errorCallback);
        dynamicTask.startDelay(key, ()->{
            callbacks.remove(key);
            callbacksForError.remove(key);
            VMCResult<Object> VMCResult = new VMCResult<>();
            VMCResult.setCode(ERROR_CODE_TIMEOUT);
            VMCResult.setMsg("timeout");
            errorCallback.handler(VMCResult);
        }, userSetting.getPlatformPlayTimeout());
        redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }

    /**
     * 发送请求推流的消息
     * @param param 推流参数
     * @param callback 回调
     */
    public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) {
        String key = UUID.randomUUID().toString();
        VmcRedisMsg redisMsg = VmcRedisMsg.getRequestInstance(userSetting.getServerId(), serverId,
                VmcRedisMsgCmd.REQUEST_PUSH_STREAM, key, param);

        JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg);
        logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject);
        dynamicTask.startDelay(key, ()->{
            callbacksForStartSendRtpStream.remove(key);
            callbacksForError.remove(key);
        }, userSetting.getPlatformPlayTimeout());
        callbacksForStartSendRtpStream.put(key, callback);
        callbacksForError.put(key, (VMCResult)->{
            logger.info("[REDIS 请求其他平台推流] 失败: {}", VMCResult.getMsg());
            callbacksForStartSendRtpStream.remove(key);
            callbacksForError.remove(key);
        });
        redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject);
    }
}
